package com.jdk.demo.nio.mReactorThread;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Reactor implements Runnable {

    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException { //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //非阻塞
        serverSocket.configureBlocking(false);

        //分步处理,第一步,接收accept事件
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        //attach callback object, Acceptor
        sk.attach(new MultiWorkThreadAcceptor());
    }

    @Override
	public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    // 主Reactor只负责收到连接
                	key = (it.next());
                    it.remove();
                    dispatch(key);
                }
                selected.clear();
            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        //调用之前注册的callback对象
        if (r != null) {
            r.run();
        }
    }

    // inner class
    class MultiWorkThreadAcceptor implements Runnable {
    	// cpu线程数相同多work线程
        int workCount = 2;
        SubReactor[] workThreadHandlers = new SubReactor[workCount];
        volatile int nextHandler = 0;

        public MultiWorkThreadAcceptor() {
            this.init();
        }

        public void init() {
            nextHandler = 0;
            for (int i = 0; i < workThreadHandlers.length; i++) {
                try {
                	workThreadHandlers[i] = new SubReactor(i);
                    new Thread(workThreadHandlers[i]).start();
                } catch (Exception e) {

                }
            }
        }

    	@Override
		public void run() {
            try {
            	// 主selector负责accept
                SocketChannel channel = serverSocket.accept();
                if (channel != null) {// 注册读写
                    synchronized (channel) {
                    	// 顺序获取SubReactor，然后注册channel
                        SubReactor work = workThreadHandlers[nextHandler];
                        try {
							work.registerChannel(channel);
						} catch (Exception e) {
							e.printStackTrace();
						}
                        nextHandler++;
                        if (nextHandler >= workThreadHandlers.length) {
                            nextHandler = 0;
                        }
                    }
                }
            } catch (IOException ex)
            { /* ... */ }
        }
    }

}